Image Retrieval by Hashing based COSFIRE Descriptors Approach
Introduction
This work develops a compact image hash code learning framework based on the COSFIRE filter banks for efficient similarity search and retrieval. Images are first passed through class-specific COSFIRE filters designed to activate on visually discriminative patterns. For comprehensive details on COSFIRE filters, we refer the reader to (Ndung’u et al. 2024) and (Azzopardi and Petkov 2012). These feature vectors are input to a multi-layer perceptron (MLP) network to learn binary hash codes that should capture the semantic similarity of the images, enabling efficient matching of hash codes of database images for fast retrieval. Our experiments on an image dataset demonstrate the potential of this straightforward approach for developing compact hash codes based rotation-invariant COSFIRE descriptors.
Tip
MLP is a type of artificial neural network consisting of multiple layers of neurons. The neurons in the MLP typically use nonlinear activation functions, allowing the network to learn complex patterns in data.
Data description
The input data consists of a set of descriptors extracted for each image using COSFIRE filters. Specifically, 93 COSFIRE filters are designed for each image class. From the work of (Ndung’u et al. 2024), it was discovered that the best accuracy required only 372 COSFIRE filters per class on the validation set (4 \(\times\) 93 descriptors). Therefore, we used the descriptors from the 93 COSFIRE filters per class as the basis for our image hashing experiments.
This process is applied to every image, resulting in a dataframe where each row contains the 372-element descriptor vector corresponding to an image. So each image is represented by a n-dimensional vector (n=372 in this case) of COSFIRE filter response values, which encode visual characteristics that help differentiate between classes.
Tip
The dataframe stores these image descriptor vectors, with each row representing a single image and each column representing the maximum response from one of the 372 total COSFIRE filters applied. This serves as the input data capturing image features that will be further transformed into a k-bit hash code for efficient similarity search and retrieval. The compact hash representation helps quickly locate the most similar images from the database given a new query image.
Code
import osos.environ['PYTHONHASHSEED'] ='python_envn'from scipy import statsfrom IPython.display import display, Markdown, HTMLfrom itables import init_notebook_modeinit_notebook_mode(all_interactive=True)from itables import showimport torch.nn as nnimport plotly.express as pximport pandas as pdimport plotly.graph_objects as goimport plotly.io as piopio.renderers.default ="notebook"import pandas as pdimport numpy as npimport refrom scipy import statsimport seaborn as snsimport matplotlib.pyplot as plt
This is the init_notebook_mode cell from ITables v2.0.1
(you should not see this message - is your notebook trusted?)
Let \(\Omega\) represent the COSFIRE descriptor embedding space for radio galaxy images. The objective is to discover a mapping function \(F : \Omega → {+1, −1}^{k}\) that translates the embedding space to a k-bit binary code space. This mapping should be learned in such a way that visually or semantically similar radio galaxy images are assigned binary codes that are close to each other, while dissimilar images are mapped to binary codes that are far apart in the binary code space.
where \(D_h(· , ·)\) denotes the Hamming distance between two binary vectors, and m > 0 is a margin threshold parameter.
In this loss function: y = 0 if they are similar, and y = 1 otherwise
Note
The first term encourages similar pairs to have small distances - punishes similar images mapped to different binary codes.
The second term encourages dissimilar pairs to have distances greater than the margin m punishes dissimilar images mapped to close binary codes when their Hamming distance falls below the margin threshold m. Only those dissimilar pairs having their distance within a radius are eligible to contribute to the loss function.
Suppose that there are N training pairs randomly selected from the training images \({(I_i,1, I_i,2, y_i)|i = 1, ..., N}\), our goal is to minimize the overall loss function:
Regularization: To reduce the discrepancy between Euclidean space and the Hamming space a commonly used relaxation scheme is to utilize sigmoid or tanh function to approximate the thresholding procedure. A regularizer is applied to help obtain real-valued network outputs (from sigmoid/tanh/relu etc) to approach the desired discrete binary-like values (e.g 0,1).
def DSHLoss(u, y,alpha, margin):# m > 0 is a margin. The margin defines a radius around X1,X2. Dissimilar pairs contribute to the loss# function only if their distance is within this radius m =2* margin y = y.int()# Create a duplicate y_label to form an N X N matrix#aim: # y = 0 if they are similar, and y = 1 otherwise y = y.unsqueeze(1).expand(len(y),len(y)) y_label = torch.ones_like(torch.empty(len(y), len(y))) y_label[y == y.t()] =0 dist = torch.cdist(u, u, p=2).pow(2) loss1 =0.5* (1- y_label) * dist loss2 =0.5* y_label * torch.clamp(m - dist, min=0) B1 = torch.norm(torch.abs(u) -1, p=1, dim=1)# create an N X N matrix to help in creating the pairs in the subsequent step B2 = B1.unsqueeze(1).expand(len(y), len(y))#add across the pairs - a transpose is required in order to have pair additions. loss3 = (B2 + B2.t()) * alpha minibatch_loss = torch.mean(loss1 + loss2 + loss3)return minibatch_loss
Model
We train a simple MLP network architecture. By optimizing this loss, the neural network learns to transform the input into a latent feature space that accentuates similarities and differences critical for distinguishing images effectively. The resulting model provides an end-to-end learning pipeline from raw image inputs to a k(36)-bit compact hash code amenable for efficient image retrieval and matching. Our experiments demonstrate the potential of this straightforward architecture and training approach for image hashing.
The model is built by creating a sequential stack of linear layers, batch normalization layers, and activation functions. It takes an input of size of 200 vector embeddings, passes it through two linear layers with 128 and 64 units respectively, applies batch normalization and Tanh activation function, it also includes a dropout layer with a probability of 0.3 for regularization, and finally outputs a tensor of size output_size after applying another Tanh activation function.
class CosfireNet(nn.Module):def__init__(self, input_size, bitsize, l1_reg, l2_reg):super(CosfireNet, self).__init__()self.l1_reg = l1_regself.l2_reg = l2_regself.hd = nn.Sequential( nn.Linear(input_size, 300), nn.BatchNorm1d(300), nn.Tanh(), nn.Linear(300, 200), nn.BatchNorm1d(200), nn.Tanh(), nn.Linear(200, bitsize), nn.Tanh() )def forward(self, x): regularization_loss =0.0for param inself.hd.parameters(): regularization_loss += torch.sum(torch.abs(param)) *self.l1_reg # L1 regularization regularization_loss += torch.sum(param **2) *self.l2_reg # L2 regularizationreturnself.hd(x), regularization_loss# Dataclass CosfireDataset(Dataset):def__init__(self, dataframe):self.data = torch.tensor(preprocessing.normalize(dataframe.iloc[:, :-1].values), dtype=torch.float32)self.labels = torch.tensor(dataframe.iloc[:, -1].values, dtype=torch.float32)def__len__(self):returnlen(self.data)def__getitem__(self, idx):returnself.data[idx], self.labels[idx]epochs =2learning_rate =0.01alpha =0.00001margin =36batch_size =48bitsize =72l1_reg =1e-8l2_reg =1e-08#List of Grid search parameters to iterate over# learning_rate_values = [ 0.1, 0.01]# alphas = [1e-03, 1e-5]# margin_values = [24, 36, 48] # batch_size_values = [32, 48, 64]# bitsize_values = [16, 24, 32, 40, 48, 56]# l2_reg_values = [0, 1e-08]# l1_reg_values = [0, 1e-08]# Train Valid & Test datatrain_df,valid_df,test_df = get_and_check_data_prev(data_path,data_path_valid,data_path_test,dic_labels) # DataLoader for training settrain_dataset = CosfireDataset(train_df)train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)# DataLoader for validation setvalid_dataset = CosfireDataset(valid_df)valid_dataloader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=True)model = CosfireNet(input_size, bitsize, l1_reg, l2_reg)optimizer = optim.RMSprop(model.parameters(), lr=learning_rate)scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.95)# Lists to store training and validation lossestrain_losses = []train_losses_eval = []val_losses = []# Variables to keep track of the best model# Train the loopfor _ in tqdm(range(epochs), desc='Training Progress', leave=True): model.train() total_train_loss =0.0for _, (train_inputs, labels) inenumerate(train_dataloader): optimizer.zero_grad() train_outputs, reg_loss = model(train_inputs) loss = DSHLoss(u = train_outputs, y=labels, alpha = alpha, margin = margin) + reg_loss loss.backward() optimizer.step() total_train_loss += loss.item() * train_inputs.size(0) scheduler.step()# Calculate average training loss average_train_loss = total_train_loss /len(train_dataloader) train_losses.append(average_train_loss)# Validation loop model.eval() total_val_loss =0.0with torch.no_grad():for val_inputs, val_labels in valid_dataloader: val_outputs, reg_loss = model(val_inputs) val_loss = DSHLoss(u = val_outputs, y=val_labels, alpha = alpha, margin = margin) + reg_loss total_val_loss += val_loss.item() * val_inputs.size(0)# Calculate average validation loss average_val_loss = total_val_loss /len(valid_dataloader) val_losses.append(average_val_loss)plt.plot(range(1, epochs +1), train_losses, label='Training Loss')plt.plot(range(1, epochs +1), val_losses, label='Validation Loss')plt.xlabel('Epoch')plt.ylabel('Loss')plt.title('Training and Validation Loss Curves')plt.legend()plt.savefig(output_dir +'/Train_valid_curves.png')plt.close()########################################################################################## Evaluate ###########################################################################model.eval()valid_dataset_eval = CosfireDataset(valid_df)valid_dataloader_eval = DataLoader(valid_dataset_eval, batch_size=batch_size, shuffle=False)train_dataset_eval = CosfireDataset(train_df)train_dataloader_eval = DataLoader(train_dataset_eval, batch_size=batch_size, shuffle=False)test_dataset = CosfireDataset(test_df)test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)# Lists to store predictionspredictions = []# Perform predictions on the train setwith torch.no_grad():for train_inputs, _ in tqdm(train_dataloader_eval, desc='Predicting', leave=True): train_outputs,_ = model(train_inputs) predictions.append(train_outputs.numpy())# Flatten the predictionsflat_predictions_train = [item for sublist in predictions for item in sublist]# Append predictions to the df_train DataFrametrain_df['predictions'] = flat_predictions_traintrain_df['label_name'] = train_df['label_code'].map(dic_labels_rev)train_df.to_csv(output_dir +'/train_df.csv',index =False)#################################################################predictions = []# Perform predictions on the valid setwith torch.no_grad():for valid_inputs, _ in tqdm(valid_dataloader_eval, desc='Predicting', leave=True): valid_outputs,_ = model(valid_inputs) predictions.append(valid_outputs.numpy())# Flatten the predictionsflat_predictions_valid = [item for sublist in predictions for item in sublist]# Append predictions to the valid_df DataFramevalid_df['predictions'] = flat_predictions_validvalid_df['label_name'] = valid_df['label_code'].map(dic_labels_rev)valid_df.to_csv(output_dir +'/valid_df.csv',index =False)###########################################################################Testing##########################################################################Perform predictions on the testing setpredictions_test = []with torch.no_grad():for test_inputs, _ in tqdm(test_dataloader, desc='Predicting', leave=True): test_outputs,_ = model(test_inputs) predictions_test.append(test_outputs.numpy())# Flatten the predictionsflat_predictions_test = [item for sublist in predictions_test for item in sublist]# Append predictions to the test_df DataFrametest_df['predictions'] = flat_predictions_testtest_df['label_name'] = test_df['label_code'].map(dic_labels_rev)test_df.to_csv(output_dir +'/test_df.csv',index =False)
Binarization and Mean Average Precision (mAP)
Mean average precision (mAP) is a commonly used evaluation metric in image retrieval tasks. It measures the average precision across all queries in the dataset. Precision is defined as the number of relevant images retrieved divided by the total number of images retrieved.
where AP represents the average precision of one query, with \(n\) being the total number of reference images, and \(\text{GTP}\) the total number of ground truth positives, \(\text{Precision}(i)\) is the precision of the top \(i\) ranked reference images and \(\text{Rel}(i)\) is an indicator variable that is 1 if the \(i\)th image is relevant and 0 otherwise. Finally, the mAP is computed as the average of all AP values obtained for all \(N_q\) query images.
Tip
In image retrieval, a query is typically an image, and the task is to retrieve a set of relevant images from a large dataset. The mAP metric is used to evaluate the performance of the retrieval system by comparing the retrieved images to a set of ground-truth relevant images for each query.
mAP takes into account both the relevance and the ranking of the retrieved images. A high mAP score indicates that the retrieval system is able to retrieve a high proportion of relevant images, and that these images are ranked highly in the retrieved set.
thresholds_abs_values = np.arange(-1, 1.2, 0.1)mAP_results_valid = []mAP_results_test = []for _,thresh inenumerate(thresholds_abs_values): mAP_valid_thresh,_,_, _, _, _,_ = mAP_values(train_df,valid_df,thresh = thresh, percentile =False) mAP_test_thresh,_,_, _, _, _,_ = mAP_values(train_df, test_df,thresh = thresh, percentile =False) mAP_results_valid.append(mAP_valid_thresh) mAP_results_test.append(mAP_test_thresh)#topkmap, cum_prec, cum_recall, cum_map = MapWithPR_values(train_df,valid_df,thresh = thresh, percentile = False)# Plottingdata_abs_values = {'mAP_valid': mAP_results_valid,'mAP_test': mAP_results_test,'threshold': thresholds_abs_values}df_thresh_abs_values = pd.DataFrame(data_abs_values)df_thresh_abs_values.to_csv(output_dir +'/results_data_abs_values.csv',index =False)# Find the index of the maximum mAP valuemax_map_index = df_thresh_abs_values['mAP_valid'].idxmax()# Retrieve the threshold corresponding to the maximum mAPthreshold_max_map_abs_values = df_thresh_abs_values.loc[max_map_index, 'threshold']#%%mAP_valid_abs_values,mAP_std,mAP_values1, r_binary, train_label, q_binary, valid_label = mAP_values(train_df,valid_df,thresh = threshold_max_map_abs_values, percentile =False)fig, ax = plt.subplots(figsize=(10/3, 3))plt.plot(thresholds_abs_values, mAP_results_valid,color='#ff7f0eff')plt.scatter(threshold_max_map_abs_values, mAP_valid_abs_values, color='#d62728ff', marker='o', s=25)plt.xlabel('Threshold')plt.ylabel('mAP')#plt.rc('font', family='Nimbus Roman')plt.savefig(output_dir +'/Maps_curves_abs_values.svg',format='svg', dpi=1200)plt.show()print('The optimal threshold from validation data is: ', np.round(threshold_max_map_abs_values,2))print('The Best Validation mAP is: ',np.round(mAP_valid_abs_values,2))
The optimal threshold from validation data is: 0.9
The Best Validation mAP is: 42.83
Now applying the same model (best model from the validation data) to the test data with the best threshold as per the validation data.
Code
mAP_test_abs_values,mAP_std,mAP_values1, train_binary, train_label, test_binary, valid_label = mAP_values(train_df, test_df,thresh = threshold_max_map_abs_values, percentile =False)topkmap, cum_prec, cum_recall, cum_map = MapWithPR_values(train_df,valid_df,thresh = threshold_max_map_abs_values, percentile =False)print('At the optimal threshold: ', np.round(threshold_max_map_abs_values,2))print('The Test mAP is: ',np.round(mAP_test_abs_values,2))
At the optimal threshold: 0.9
The Test mAP is: 44.62
Model Predictions overview
Exploring the quality of the model predictions by visualizing the distribution of the predicted values for each class in the test set. This can help identify any potential biases or patterns in the predictions that may be worth investigating further.
dic_labels_rev = { 2:'Bent',3:'Compact',0:'FRI',1: 'FRII' }train_df['labels'] = train_df['label_code'].map(dic_labels_rev)test_df['labels'] = test_df['label_code'].map(dic_labels_rev)# Create a figure and axisfig, ax = plt.subplots(figsize=(8, 6))# Iterate over label_code valuesfor _,label_code inenumerate(list(train_df['label_name'].value_counts().index)): dff = test_df.query(f'label_code == label_code') out_array_train = [] dd_train = np.array([out_array_train.extend(np.array(out)) for _, out inenumerate(dff.predictions)]) out_array_train = np.array(out_array_train)# Plot the KDE curve with a hue sns.kdeplot(out_array_train, label=label_code, ax=ax)# Set the x-axis limitsax.set_xlim(-1, 1)# Customize the plotax.set_xlabel('Value')ax.set_ylabel('Density')ax.grid(True)ax.legend()# Display the plotplt.show()plt.close()# df = pd.DataFrame([test_df.predictions[0]])# df.columns = ['hash_'+str(i) for i in range(36)]# df['label'] = test_df.labels[0]# for j in range(1,400,20):# df2 = pd.DataFrame([test_df.predictions[j]])# df2.columns = ['hash_'+str(i) for i in range(36)]# df2['label'] = test_df.labels[j]# df = pd.concat([df,df2])# display(Markdown(df.to_markdown(index = True)))
Code
# Create a figure and axisfig, ax = plt.subplots(figsize=(8, 6))# Iterate over label_code valuesfor _,label_code inenumerate(list(train_df['label_name'].value_counts().index)): dff = train_df.query(f'label_code == label_code') out_array_train = [] dd_train = np.array([out_array_train.extend(np.array(out)) for _, out inenumerate(dff.predictions)]) out_array_train = np.array(out_array_train)# Plot the KDE curve with a hue sns.kdeplot(out_array_train, label=label_code, ax=ax)# Set the x-axis limitsax.set_xlim(-1, 1)# Customize the plotax.set_xlabel('Value')ax.set_ylabel('Density')ax.grid(True)ax.legend()# Display the plotplt.show()plt.close()# df = pd.DataFrame([train_df.predictions[0]])# df.columns = ['hash_'+str(i) for i in range(36)]# df['label'] = train_df.labels[0]# for j in range(1,1180,50):# df2 = pd.DataFrame([train_df.predictions[j]])# df2.columns = ['hash_'+str(i) for i in range(36)]# df2['label'] = train_df.labels[j]# df = pd.concat([df,df2])# display(Markdown(df.to_markdown(index = True)))
Model comparisons with performance of DenseNet-161 model (Ndung’u et al. 2023). A similar study on image retrieval whic serves as a suitable reference for comparison, given that the data set utilized in the the paper is identical to the one used in this project.
Model comparisons while using different bit sizes. Derived from separate analyses.
Code
# df_top_mAP = df.sort_values(by="mAP", ascending=False).head(10)# display(Markdown(df_top_mAP.to_markdown(index = True)))data = {'bit': ['8_bit', '16_bit', '24_bit', '32_bit', '36_bit', '48_bit', '64_bit', '72_bit'],'mAP': [60.68, 60.12, 76.12, 91.76,93.06, 91.97, 91.41,90.77]}df = pd.DataFrame(data)# Create the bar plotplt.figure(figsize=(10, 6))plt.bar(df['bit'], df['mAP'])#plt.ylim(50,100)# Add value labels to the barsfor i, v inenumerate(df['mAP']):if df['bit'][i] in ['8_bit', '16_bit', '24_bit']: color ='red'else: color ='black' plt.text(i, v +0.5, str(v), ha='center', color=color)# Set the plot title and labelsplt.title('mAP Values for Different Bit sizes')plt.xlabel('Bit Size')plt.ylabel('mAP')# Display the plotplt.show()
References
Azzopardi, George, and Nicolai Petkov. 2012. “Trainable COSFIRE Filters for Keypoint Detection and Pattern Recognition.”IEEE Transactions on Pattern Analysis and Machine Intelligence 35 (2): 490–503.
Ndung’u, Steven, Trienko Grobler, Stefan J. Wijnholds, Dimka Karastoyanova, and George Azzopardi. 2023. “Deep Supervised Hashing for Fast Retrieval of Radio Image Cubes.” In 2023 XXXVth General Assembly and Scientific Symposium of the International Union of Radio Science (URSI GASS), 1–4. https://doi.org/10.23919/URSIGASS57860.2023.10265687.
Ndung’u, Steven, Trienko Grobler, Stefan J Wijnholds, Dimka Karastoyanova, and George Azzopardi. 2024. “Classification of radio galaxies with trainable COSFIRE filters.”Monthly Notices of the Royal Astronomical Society 530 (1): 783–94. https://doi.org/10.1093/mnras/stae821.